from kafka import KafkaAdminClient
from kafka.admin import NewTopic

from util.config_center import ConfigCenter


class KafkaAdminClientConnect:
    """kafka主用户连接"""

    def __init__(self):
        # 获取配置信息
        clusters = ConfigCenter.get_kafka_config()
        self.admin = KafkaAdminClient(bootstrap_servers=clusters)

    def create_topic(self, topic_name, num_partitions=1, replication_factor=1):
        """
        创建topic
        :param topic_name: topic名
        :param num_partitions: 分区号
        :param replication_factor: 备份数
        :return: 创建结果
        """
        topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
        return self.admin.create_topics([topic], validate_only=False)

    def delete_topic(self, topic_name):
        """
        删除topic
        :param topic_name:
        :return:删除结果
        """
        return self.admin.delete_topics([topic_name])
